-
Notifications
You must be signed in to change notification settings - Fork 406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: merge predicate for concurrent writes #2291
Conversation
@Blajda :) |
I have found another bug: After the successful merge, the merge operation will return a DeltaTable, for this the new commit will be merged in the old snapshot, but the concurrent commit is not included in it, which results in a version mismatch error. I'll try to recreate this in an integration test. |
@JonasDev1 yup it's a known issue and there is a PR open for it in the write operation: #2280, but we probably need to address the commit advancement across the board |
I have added some merge concurrency tests where the version mismatch error also occurs |
133bb69
to
227dcd3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
The Delta protocol doesn't specify anything regarding conflicting resolution so I think this is okay.
Some of the details might change if we need to inter-op with other implementations.
Description
This merge request will change the commitInfo.operationParameters.predicate of the merge operation. This is required to allow conflict checking on concurrent commits. Before, the predicate was just the simple merge predicate like
source.eventId = target.eventId
, but the conflict checker doesn't know these aliases and doesn't have access to the source df.So I now use the early_filter, which is also used before to filter a subset of the target table. Basically, the early_filter only contains static filters and partition filters that are converted to fixed values based on the source data. The early_filter can be None if no column/partition pre-filtering is possible, or if the merge contains a not_match_source operation. (See the generalize_filter function in the file).
The commitInfo predicate uses exactly this filter, except that the target alias is removed.
The predicate is used by the conflict checker, for example when there are multiple concurrent merges. If there is a predicate, the conflict checker will check whether the concurrent commit wrote or deleted data within that predicate. If the predicate is None, the conflict checker will treat the commit as a `read_whole_table' and interpret any concurrent updates as a conflict.
Example:
Target table with partition country
Merge with predicate
source.id = target.id AND target.country='DE'
-> CommitInfo predicatecountry='DE'
Merge with predicate
source.id = target.id AND target.country=source.country
-> CommitInfo predicatecountry='DE' OR country='US'
Merge with predicate
source.id = target.id
-> CommitInfo predicate None (As full target table join is required)Related Issue(s)